Graph Analytics in Spark
1 Algorithms over graphs
GraphFrame provides the parallel implementation of a set of state of the art algorithms for graph analytics
- Breadth first search
- Shortest paths
- Connected components
- Strongly connected component
- Label propagation
- PageRank
- …
Also custom algorithms can be designed and implemented.
1.1 Checkpoint directory
To run some expensive algorithms, set a checkpoint directory that will store the state of the job at every iteration. This allows to continue where left off if the job crashes. Create such a folder to set the checkpoint directory with:
graphframes_ckpts_diris the new checkpoint folder directoryscis the SparkContext object (retrieve it from a SparkSession by usingspark.sparkContext)
1.2 Breadth first search
Breadth-first search (BFS) is an algorithm for traversing/searching graph data structures: it finds the shortest path(s) from one vertex (or a set of vertexes)to another vertex (or a set of vertexes). It is used in many other algorithms
- Length of shortest paths
- Connected components
- …
1.2.1 Implementation
The bfs() method of the GraphFrame class returns the shortest path(s) from the vertexes matching expression fromExpr expression to vertexes matching expression toExpr. If there are many vertexes matching fromExpr and toExpr, only the couple(s) with the shortest length is returned.
fromExpr: Spark SQL expression specifying valid starting vertexes for the execution of the BFS algorithm (e.g., to start from a specific vertex: “id = [start vertex id]”);toExpr: Spark SQL expression specifying valid target vertexes for the BFS algorithm;maxPathLength: Limit on the length of paths (default = 10);edgeFilter: Spark SQL expression specifying edges that may be used in the search (default None).
bfs() returns a DataFrame containing the selected shortest path(s). Notice that ff multiple paths are valid and their length is equal to the shortest length, the returned DataFrame will contain one Row for each path. The number of columns of the returned DataFrame is equal to \((\text{length of the shortest path}*2).+1\).
- Find the shortest path from Esther to Charlie
- Store the result in a DataFrame
The content of the returned DataFrame is the following
| from | \(e0\) | \(v1\) | \(e1\) | to |
|---|---|---|---|---|
| \([u5, \text{Esther}, 32]\) | \([u5, u6, \text{follow}]\) | \([u6, \text{Fanny}, 36]\) | \([u6, u3, \text{follow}]\) | \([u3, \text{Charlie}, 30]\) |
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Search from vertex with name = "Esther" to vertex with name = "Charlie"
shortestPaths = g.bfs("name = 'Esther' ", "name = 'Charlie' ")- Find the shortest path from Alice to a user who is 30 years old
- Store the result in a DataFrame
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Find the shortest path from Alice to a user who is 30 years old
shortestPaths = g.bfs("name = 'Alice' ", "age= 30")- Find the shortest path from any user who is less than 31 years old to any user who is more than 30 years old
- Store the result in a DataFrame
Notice that two paths are selected in this case
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Find the shortest path from any user who is less than 31 years old
# to any user who is more than 30 years old
shortestPaths = g.bfs("age<31", "age>30")- Find the shortest path from Alice to any user who is less than 31 years old without using follow edges
- Store the result in a DataFrame
Notice that two paths are selected in this case
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Find the shortest path from Alice to any user who is less
# than 31 years old without using “follow” edges
shortestPaths = g.bfs("name = 'Alice' ", "age<31", "relationship<> 'follow' ")1.3 Shortest path
The shortest path method selects the length of the shortest path(s) from each vertex to a given set of landmark vertexes. It uses the BFS algorithm for computing the shortest paths.
1.3.1 Implementation
The shortestPaths(landmarks) method of the GraphFrame class returns the length of the shortest path(s) from each vertex to a given set of landmarks vertexes. For each vertex, one shortest path for each landmark vertex is computed and its length is returned.
landmarks: list of IDs of landmark vertexes (e.g., \([u1, u4]\))
shortestPaths() returns a DataFrame that contains one record/row for each distinct vertex of the input graph (also for the non-connected ones). This method is characterized by the following columns
- one column for each attribute of the vertexes
- distances (type map): for each landmark lm it contains one pair (ID lm: length shortest path from the vertex of the current record to lm)
- Find for each user the length of the shortest path to user \(u1\) (i.e., Alice)
- Store the result in a DataFrame
| Vertex | Distance to \(u1\) |
|---|---|
| \(u1\) | 0 |
| \(u2\) | - |
| \(u3\) | - |
| \(u4\) | 1 |
| \(u5\) | 2 |
| \(u6\) | - |
| \(u7\) | - |
The content of the returned DataFrame is the following
| id | name | age | distances |
|---|---|---|---|
| \(u1\) | Alice | 34 | \([u1 \rightarrow 0]\) |
| \(u2\) | Bob | 36 | \([\quad]\) |
| \(u3\) | Charlie | 30 | \([\quad]\) |
| \(u4\) | David | 29 | \([u1 \rightarrow 1]\) |
| \(u5\) | Esther | 32 | \([u1 \rightarrow 2]\) |
| \(u6\) | Fanny | 36 | \([\quad]\) |
| \(u7\) | Gabby | 60 | \([\quad]\) |
- \([u1 \rightarrow 0]\): data type is map. It stores a set of pairs (Key: Value)
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Find for each user the length of the shortest path to user u1
shortestPaths = g.shortestPaths(["u1"])- Find for each user the length of the shortest path to users \(u1\) (Alice) and \(u4\) (David)
- Store the result in a DataFrame
| Vertex | Distance to \(u1\) | Distance to \(u1\) |
|---|---|---|
| \(u1\) | 0 | 2 |
| \(u2\) | - | - |
| \(u3\) | - | - |
| \(u4\) | 1 | 0 |
| \(u5\) | 2 | 1 |
| \(u6\) | - | - |
| \(u7\) | - | - |
The content of the returned DataFrame is the following
| id | name | age | distances |
|---|---|---|---|
| \(u1\) | Alice | 34 | \([u1 \rightarrow 0, u4 \rightarrow 2]\) |
| \(u2\) | Bob | 36 | \([\quad]\) |
| \(u3\) | Charlie | 30 | \([\quad]\) |
| \(u4\) | David | 29 | \([u1 \rightarrow 1, u4 \rightarrow 0]\) |
| \(u5\) | Esther | 32 | \([u1 \rightarrow 2, u4 \rightarrow 1]\) |
| \(u6\) | Fanny | 36 | \([\quad]\) |
| \(u7\) | Gabby | 60 | \([\quad]\) |
- \([u1 \rightarrow 0]\): data type is map. It stores a set of pairs (Key: Value)
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Find for each user the length of the shortest paths to users u1 and u4
shortestPaths = g.shortestPaths(["u1", "u4"])1.4 Connected components
A connected component of a graph is a subgraph \(sg\) such that
- Any two vertexes in \(sg\) are connected to each other by at least one path
- The set of vertexes in \(sg\) is not connected to any additional vertexes in the original graph
The direction of edges is not considered.
The connectedComponents() method of the GraphFrame class returns the connected components of the input graph. This is an expensive algorithm, and requires setting a Spark checkpoint directory.
1.4.1 Implementation
The connectedComponents() method returns a DataFrame that contains one record/row for each distinct vertex of the input graph. It is characterized by the following columns
- one column for each attribute of the vertexes
- component (type long). It is the identifier of the connected component to which the current vertex has been assigned.
Print on the stdout the number of connected components of the following graph
Notice that The are two connected components on this graph.
This is the content of the DataFrame used to store the two identified connected components
| id | name | age | component |
|---|---|---|---|
| \(u6\) | Fanny | 36 | 146028888064 |
| \(u1\) | Alice | 34 | 146028888064 |
| \(u3\) | Charlie | 30 | 146028888064 |
| \(u5\) | Esther | 32 | 146028888064 |
| \(u2\) | Bob | 36 | 146028888064 |
| \(u4\) | David | 29 | 146028888064 |
| \(u7\) | Gabby | 60 | 1546188226560 |
Notice the “component” field
- “146028888064”: vertexes of the first component
- “1546188226560”: vertexes of the second component
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Set checkpoint folder
sc.setCheckpointDir("tmp_ckpts")
# Run the algorithm
connComp=g.connectedComponents()
# Count the number of components
nComp=connComp.select("component").distinct().count()
print("Number of connected components: ", nComp)1.5 Strongly connected components
A directed subgraph \(sg\) is called strongly connected if every vertex in \(sg\) is reachable from every other vertex in \(sg\). For undirected graph, connected and strongly connected components are the same.
1.5.1 Implementation
The stronglyConnectedComponents() method of the GraphFrame class returns the strongly connected components of the input graph. It is an expensive algorithm (better to run it on a cluster with yarn scheduler even with small graphs), and it requires setting a Spark checkpoint directory.
stronglyConnectedComponents() returns a DataFrame that contains one record/row for each distinct vertex of the input graph. It is characterized by the following columns
- one column for each attribute of the vertexes
- component (type long). It is the identifier of the strongly connected component to which the current vertex has been assigned.
Print on the stdout the number of strongly connected components of the input graph.
Notice that there are four connected components on this graph.
This is the content of the DataFrame used to store the identified strongly connected components
| id | name | age | component |
|---|---|---|---|
| \(u3\) | Charlie | 30 | 146028888064 |
| \(u2\) | Bob | 36 | 146028888064 |
| \(u1\) | Alice | 34 | 498216206336 |
| \(u5\) | Esther | 32 | 498216206336 |
| \(u4\) | David | 29 | 498216206336 |
| \(u6\) | Fanny | 36 | 1090921693184 |
| \(u7\) | Gabby | 60 | 1546188226560 |
Notice the “component” field
- “146028888064”: vertexes of the first strongly connected component
- “498216206336”: vertexes of the second strongly connected component
- “1090921693184”: vertexes of the third strongly connected component
- “1546188226560”: vertexes of the fourth strongly connected component
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Set checkpoint folder
sc.setCheckpointDir("tmp_ckpts")
# Run the algorithm
strongConnComp = g.stronglyConnectedComponents(maxIter=10)
# Count the number of strongly connected components
nComp=strongConnComp.select("component").distinct().count()
print("Number of strongly connected components: ", nComp)1.6 Label propagation
Label Propagation is an algorithm for detecting communities in graphs. It is similar to clustering, but exploits connectivity. Convergence is not guaranteed, and also it is possible to end up with trivial solutions.
1.6.1 The Label Propagation algorithm
Each vertex in the network is initially assigned to its own community: at every step, vertexes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.
1.6.2 Implementation
The labelPropagation(maxIter) method of the GraphFrame class runs and returns the result of the label propagation algorithm.
maxIter: number of iterations to run
labelPropagation() returns a DataFrame that contains one record/Row for each distinct vertex of the input graph. It is characterized by the following columns
- one column for each attribute of the vertexes
- label (type long). It is the identifier of the community to which the current vertex has been assigned.
Split in groups the vertexes of the graph by using the label propagation algorithm.
Notice that the result returned by one run of the algorithm. Pay attention that convergence is not guarantee, and different results may come out from different runs.
This is the content of the DataFrame used to store the identified communities
| id | name | age | label |
|---|---|---|---|
| \(u3\) | Charlie | 30 | 146028888064 |
| \(u4\) | David | 29 | 498216206336 |
| \(u1\) | Alice | 34 | 498216206336 |
| \(u5\) | Esther | 32 | 498216206337 |
| \(u7\) | Gabby | 60 | 1546188226560 |
| \(u2\) | Bob | 36 | 1606317768704 |
| \(u6\) | Fanny | 36 | 1606317768704 |
- “146028888064”: vertexes of the first community
- “498216206336”: vertexes of the second community
- “1546188226560”: vertexes of the third community
- “1606317768704”: vertexes of the fourth community
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Run the label propagation algorithm
labelComm = g.labelPropagation(10)1.7 PageRank
PageRank is the original famous algorithm used by the Google Search engine to rank vertexes (web pages) in a graph by order of importance. For the Google search engine vertexes are web pages in the World Wide Web, and edges are hyperlinks among web pages. This algorithm assigns a numerical weighting (importance) to each node.
It computes a likelihood that a person randomly clicking on links will arrive at any particular web page. For having a high PageRank, it is important to
- Have many in-links
- Be liked by relevant pages (pages characterized by a high PageRank)
The basic idea is that each link vote is proportional to the importance of its source page \(p\): if page \(p\) with importance \(\text{PageRank}(p)\) has \(n\) out-links, each out-link gets \(\frac{\text{PageRank}(p)}{n}\) votes; the importance of page \(p\) is the sum of the votes on its in-links.
1.7.1 Simple recursive formulation
- Initialize each page rank to \(1.0\): for each \(p\) in pages set \(\textbf{PageRank}(p)\) to \(1.0\)
- Iterate for \(max\) iterations
- Page \(p\) sends a contribution \(\frac{\textbf{PageRank}(p)}{\textbf{numOutLinks}(p)}\) to its neighbors (the pages it links);
- Update each page rank \(\textbf{PageRank}(p)\) with the sum of the received contributions.
1.7.2 Random jumps formulation
The PageRank algorithm simulates the “random walk” of a user on the web. Indeed, at each step of the random walk, the random surfer has two options:
with probability \(1-\alpha\), follow a link at random among the ones in the current page;
with probability \(\alpha\), jump to a random page.
Initialize each page rank to \(1.0\): for each \(p\) in pages set \(\textbf{PageRank}(p)\) to \(1.0\)
Iterate for max iterations
- Page \(p\) sends a contribution \(\frac{\textbf{PageRank}(p)}{\textbf{numOutLinks}(p)}\) to its neighbors (the pages it links);
- Update each page rank \(\textbf{PageRank}(p)\) to \(\alpha + (1 - \alpha)\) times the sum of the received contributions.
- \(\alpha=0.15\)
- Initialization: \(\forall{p}, \textbf{PageRank}(p) = 1.0\)
1.7.3 Implementation
The pageRank() method of the GraphFrame class runs the PageRank algorithm on the input graph.
resetProbability: probability of resetting to a random vertex (probability \(\alpha\) associated with random jumps);maxIter: if set, the algorithm is run for a fixed number of iterations; this may not be set if the tol parameter is set;tol: if set, the algorithm is run until the given tolerance; this may not be set if the numIter parameter is set;sourceId: the source vertex for a personalized PageRank (optional parameter).
pageRank() returns a new GraphFrame that contains the same vertexes and edges of the input graph
- the vertexes of the new graph are characterized by one new attribute, called “pagerank”, that stores the PageRank of the vertexes;
- the edges of the new graph are characterized by one new attribute, called “weight”, that stores the weight (PageRank contribution) propagated through that edge.
Apply the PageRank algorithm on the following graph and select the user associated with the highest PageRank value.
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# Run the PageRank algorithm
pageRanks = g.pageRank(maxIter=30)
# Select the maximum value of PageRank
maxPageRank = pageRanks.vertices \
.agg({"pagerank":"max"}) \
.first()["max(pagerank)"]
# Select the user with the maximum PageRank
pageRanks.vertices \
.filter(pageRanks.vertices.pagerank==maxPageRank) \
.show()1.8 Custom graph algorithms
GraphFrames provides primitives for developing yourself other graph algorithms. It is based on message passing approach: the two key components are:
aggregateMessages: it sends messages between vertexes, and aggregate messages for each vertex- joins: it joins message aggregates with the original graph
For each user, compute the sum of the ages of adjacent users (count many times the same adjacent user if there are many links).
The resulting table is
| Vertex | SumAges |
|---|---|
| \(u1\) | 97 |
| \(u2\) | 94 |
| \(u3\) | 108 |
| \(u4\) | 66 |
| \(u5\) | 99 |
| \(u6\) | 62 |
from graphframes import GraphFrame
# Vertex DataFrame
v = spark.createDataFrame(
[
("u1", "Alice", 34),
("u2", "Bob", 36),
("u3", "Charlie", 30),
("u4", "David", 29),
("u5", "Esther", 32),
("u6", "Fanny", 36),
("u7", "Gabby", 60)
],
["id", "name", "age"]
)
# Edge DataFrame
e = spark.createDataFrame(
[
("u1", "u2", "friend"),
("u2", "u3", "follow"),
("u3", "u2", "follow"),
("u6", "u3", "follow"),
("u5", "u6", "follow"),
("u5", "u4", "friend"),
("u4", "u1", "friend"),
("u1", "u5", "friend")
],
["src", "dst", "relationship"]
)
# Create the graph
g = GraphFrame(v, e)
# For each user, sum the ages of the adjacent users
# Send the age of each destination of an edge to its source
msgToSrc = AggregateMessages.dst["age"]
# Send the age of each source of an edge to its destination
msgToDst = AggregateMessages.src["age"]
# Aggregate messages
aggAge = g.aggregateMessages(
sum(AggregateMessages.msg),
sendToSrc=msgToSrc,
sendToDst=msgToDst
)
#Show result
aggAge.show()